package com.huan.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * MapReduce 中 Counter 的用法(全局计数器)
 * <pre>
 *      // 获取一个 Counter
 *      Counter counter = context.getCounter("自定义Counter", "Java Cnt");
 *      // 计数器+1
 *      counter.increment(1);
 * </pre>
 *
 * @author huan.fu
 * @date 2023/7/8 - 12:26
 */
public class CounterDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 构建配置对象
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new CounterDriver(), args);
        // 退出程序
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 构建Job对象实例 参数（配置对象，Job对象名称）
        Job job = Job.getInstance(getConf(), "wordCont");
        // 设置mr程序运行的主类
        job.setJarByClass(CounterDriver.class);
        // 设置mr程序运行的 mapper类型和reduce类型
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 指定mapper阶段输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce阶段输出的kv数据类型，业务mr程序输出的最终类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 配置本例子中的输入数据路径和输出数据路径，默认输入输出组件为： TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 是指 reduce task的个数（会导致输出的文件个数和reduce task的个数一样）
        job.setNumReduceTasks(2);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}
